package com.niit.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Spark_Stream_UpdateSateByKey {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")
    val ssc = new StreamingContext(sparkConf,Seconds(3))
    ssc.sparkContext.setLogLevel("ERROR")
    ssc.checkpoint("BD2")//设置检查点，要让缓冲区的数据落盘，为了可以在发生异常的情况进行数据的恢复

    val datas = ssc.socketTextStream("localhost", 9999)

    var wordOne = datas.map( (_,1) )

    /*
    updateStateByKey:根据Key对状态（值）进行更新
     第一个参数：表示相同Key的value数据
     第二个参数：表示缓冲区中相同Key的value数据
     */
    val state =  wordOne.updateStateByKey(
      (seq:Seq[Int],buff:Option[Int])=>{//(hello,1) (hello,1) (hello,1) (hello,1) => seq.sum =>(hello,4)
         //如果当前buff没有值则返回0，如果有值则返回当前值
        val newCount =  buff.getOrElse(0)  + seq.sum
        Option(newCount)
      }
    )

    state.print()

    ssc.start()
    ssc.awaitTermination()



  }

}
